Skip to content

Conversation

hsiang-c
Copy link
Contributor

@hsiang-c hsiang-c commented Jul 3, 2025

Which issue does this PR close?

Closes #. #1685

Rationale for this change

When we enabled Iceberg Spark tests w/ Comet-enabled Spark in #1715

1 . We implicitly loaded org.apache.comet.CometSparkSessionExtensions b/c Iceberg depends on the patched Spark. This PR explicitly configures every SparkSession.Builder with .config("spark.plugins", "org.apache.spark.CometPlugin") so that we can depend on OSS Spark.

Thanks to @andygrove for pointing out.

What changes are included in this PR?

  1. Depend on OSS Spark instead of a custom build w/ Spark patches. This saves time b/c we don't need to build custom Spark.
  2. Split Iceberg Spark tests into 3 actions and run them in parallel. ENABLE_COMET is true for all 3 actions.

In Iceberg 1.8.1.diff, we applied 2 PRs from Iceberg: apache/iceberg#13786 and apache/iceberg#13793

Additionally, we temporarily

  1. Disable spark.comet.exec.shuffle.enabled b/c it breaks several Iceberg Spark tests.
  2. Disable spark.comet.exec.broadcastExchange.enabled b/c it breaks TestRuntimeFiltering in Iceberg Spark.

How are these changes tested?

  1. We enabled Comet in Iceberg Spark tests from iceberg-spark, iceberg-spark-extensions and iceberg-spark-runtime modules.

@hsiang-c hsiang-c changed the title fix: [iceberg] Enable CometShuffleManager in Iceberg Spark tests fix: [iceberg] Switch to OSS Spark and run Iceberg Spark tests in parallel Jul 3, 2025
Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending CI

.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
.config(
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean()))
+ .config("spark.plugins", "org.apache.spark.CometPlugin")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense but could be error prone. If there is a new test that uses spark session, we miss enabling it.
Wondering if there is a good way to update all spark session at once...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kazuyukitanimura

We're lucky in some cases b/c TestBase and ExtensionsTestBase consolidate SparkSession.Builder in the abstract class.

Unfortunately, other test classes and jmh build their own SparkSession each time :(

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending with CI

@codecov-commenter
Copy link

codecov-commenter commented Jul 3, 2025

Codecov Report

❌ Patch coverage is 88.23529% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.45%. Comparing base (f09f8af) to head (468da8a).
⚠️ Report is 395 commits behind head on main.

Files with missing lines Patch % Lines
...n/scala/org/apache/comet/rules/CometScanRule.scala 88.23% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1987      +/-   ##
============================================
+ Coverage     56.12%   58.45%   +2.32%     
- Complexity      976     1263     +287     
============================================
  Files           119      143      +24     
  Lines         11743    13212    +1469     
  Branches       2251     2360     +109     
============================================
+ Hits           6591     7723    +1132     
- Misses         4012     4264     +252     
- Partials       1140     1225      +85     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove
Copy link
Member

andygrove commented Jul 8, 2025

I see that some tests are failing. I didn't run into this specific issue during my testing.

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1764.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1764.0 (TID 3312) (localhost executor driver): 
java.lang.ClassCastException: class org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to class org.apache.spark.sql.vectorized.ColumnarBatch (org.apache.spark.sql.catalyst.expressions.GenericInternalRow and org.apache.spark.sql.vectorized.ColumnarBatch are in unnamed module of loader 'app')

@hsiang-c
Copy link
Contributor Author

Most of the exceptions in Iceberg Spark SQL Tests can be reproduced by

  1. Follow the official guide to build Comet and Iceberg, configure Spark shell and populate the Iceberg table: https://datafusion.apache.org/comet/user-guide/iceberg.html
  2. Query Iceberg metadata tables with an operator. Here is an example:
-- default is the catalog name used in local HadoopCatalog setup
scala> spark.sql(s"SELECT COUNT(*) from default.t1.snapshots").show()

25/07/15 13:06:16 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.ClassCastException: class org.apache.iceberg.spark.source.StructInternalRow cannot be cast to class org.apache.spark.sql.vectorized.ColumnarBatch (org.apache.iceberg.spark.source.StructInternalRow is in unnamed module of loader scala.reflect.internal.util.ScalaClassLoader$URLClassLoader @19ac93d2; org.apache.spark.sql.vectorized.ColumnarBatch is in unnamed module of loader 'app')
	at org.apache.spark.sql.comet.CometBatchScanExec$$anon$1.next(CometBatchScanExec.scala:68)
	at org.apache.spark.sql.comet.CometBatchScanExec$$anon$1.next(CometBatchScanExec.scala:57)
	at org.apache.comet.CometBatchIterator.hasNext(CometBatchIterator.java:51)
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2(CometExecIterator.scala:155)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2$adapted(CometExecIterator.scala:154)
	at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:157)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:154)
	at org.apache.comet.Tracing$.withTrace(Tracing.scala:31)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:152)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:203)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.comet.CometBatchIterator.hasNext(CometBatchIterator.java:50)
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2(CometExecIterator.scala:155)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2$adapted(CometExecIterator.scala:154)
	at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:157)
	at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:154)
	at org.apache.comet.Tracing$.withTrace(Tracing.scala:31)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:152)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:203)
	at org.apache.spark.sql.comet.execution.shuffle.CometNativeShuffleWriter.write(CometNativeShuffleWriter.scala:106)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

@parthchandra
Copy link
Contributor

@hsiang-c created #2033 to track this issue

@hsiang-c hsiang-c force-pushed the enable_comet_shuffle branch from f5b7329 to 5c27644 Compare August 8, 2025 00:28
@hsiang-c hsiang-c force-pushed the enable_comet_shuffle branch from 5c27644 to 2056043 Compare August 12, 2025 20:07
Comment on lines 563 to 567
<<<<<<< Updated upstream
index 2c37a52..503dbd6 100644
=======
index 2c37a52..3442cfc 100644
>>>>>>> Stashed changes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hsiang-c It looks like a merge conflict in the diff file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about that, fixed now.

@hsiang-c hsiang-c force-pushed the enable_comet_shuffle branch from a3877c3 to 2a13941 Compare August 19, 2025 16:30
Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hsiang-c!

@andygrove andygrove merged commit 3a498c4 into apache:main Aug 19, 2025
96 checks passed
@hsiang-c hsiang-c deleted the enable_comet_shuffle branch August 19, 2025 21:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants